Created
December 28, 2024 01:27
-
-
Save Pymmdrza/d019a1fbeb7341f6ff65acb25ec3d3a1 to your computer and use it in GitHub Desktop.
Backup tools for any website and server create and compress with upload data to ftp, sftp , s3 and r2 with python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -------------------------------------- | |
# Programmer : github.com/PyMmdrza | |
# pip install tqdm ftplib boto3 pysftp | |
# -------------------------------------- | |
import ftplib | |
import pysftp | |
import boto3 | |
import tarfile | |
import zipfile | |
import os | |
from io import BytesIO | |
from tqdm import tqdm | |
class FileTransferScript: | |
def __init__(self): | |
self.s3_client = None | |
def prompt_transfer_method(self): | |
print("Select the transfer service:") | |
print("[1] > FTP to FTP") | |
print("[2] > FTP to S3") | |
print("[3] > S3 to FTP") | |
print("[4] > SFTP to FTP") | |
print("[5] > SFTP to SFTP") | |
print("[6] > SFTP to S3") | |
print("- Create and Compressed All Website file to :Single: BackupFile :") | |
print("[7] > backup.tar.gz to Website FTP (another server)") | |
print("[8] > backup.zip to FTP (another server)") | |
print("[9] > backup.tar to SFTP (another server)") | |
print("[10] > backup.tar.gz to S3 (just backup files)") | |
while True: | |
try: | |
choice = int(input("Enter the number of your choice: ")) | |
if 1 <= choice <= 10: | |
return choice | |
else: | |
print("Invalid choice. Please enter a number between 1 and 10.") | |
except ValueError: | |
print("Invalid input. Please enter a number.") | |
def confirm_operation(self): | |
while True: | |
confirmation = input("Confirm the operation? (yes/no): ").lower() | |
if confirmation == 'yes': | |
return True | |
elif confirmation == 'no': | |
return False | |
else: | |
print("Invalid input. Please enter 'yes' or 'no'.") | |
def ftp_to_ftp(self, source_host, source_user, source_password, source_path, dest_host, dest_user, dest_password, dest_path): | |
try: | |
with ftplib.FTP(source_host) as source_ftp, ftplib.FTP(dest_host) as dest_ftp: | |
source_ftp.login(source_user, source_password) | |
dest_ftp.login(dest_user, dest_password) | |
source_ftp.cwd(source_path) | |
files = source_ftp.nlst() | |
total_files = len(files) | |
transferred_files = 0 | |
print(f"Transferring files from FTP: {source_host}{source_path} to FTP: {dest_host}{dest_path}") | |
for file in tqdm(files, desc="Transfer Progress"): | |
try: | |
with BytesIO() as buf: | |
source_ftp.retrbinary(f'RETR {file}', buf.write) | |
buf.seek(0) | |
dest_ftp.storbinary(f'STOR {file}', buf) | |
transferred_files += 1 | |
except Exception as e: | |
print(f"Error transferring {file}: {e}") | |
print(f"Transfer completed. Transferred {transferred_files}/{total_files} files.") | |
except Exception as e: | |
print(f"FTP to FTP transfer failed: {e}") | |
def ftp_to_s3(self, ftp_host, ftp_user, ftp_password, ftp_path, s3_bucket, s3_prefix): | |
try: | |
with ftplib.FTP(ftp_host) as ftp: | |
ftp.login(ftp_user, ftp_password) | |
ftp.cwd(ftp_path) | |
files = ftp.nlst() | |
total_files = len(files) | |
transferred_files = 0 | |
print(f"Transferring files from FTP: {ftp_host}{ftp_path} to S3: s3://{s3_bucket}/{s3_prefix}") | |
for file in tqdm(files, desc="Transfer Progress"): | |
try: | |
with BytesIO() as buf: | |
ftp.retrbinary(f'RETR {file}', buf.write) | |
buf.seek(0) | |
self.get_s3_client().upload_fileobj(buf, s3_bucket, f"{s3_prefix}/{file}") | |
transferred_files += 1 | |
except Exception as e: | |
print(f"Error transferring {file}: {e}") | |
print(f"Transfer completed. Transferred {transferred_files}/{total_files} files.") | |
except Exception as e: | |
print(f"FTP to S3 transfer failed: {e}") | |
def s3_to_ftp(self, s3_bucket, s3_prefix, ftp_host, ftp_user, ftp_password, ftp_path): | |
try: | |
ftp = ftplib.FTP(ftp_host) | |
ftp.login(ftp_user, ftp_password) | |
ftp.cwd(ftp_path) | |
s3 = self.get_s3_client() | |
objects = s3.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix) | |
if 'Contents' in objects: | |
total_files = len(objects['Contents']) | |
transferred_files = 0 | |
print(f"Transferring files from S3: s3://{s3_bucket}/{s3_prefix} to FTP: {ftp_host}{ftp_path}") | |
for obj in tqdm(objects['Contents'], desc="Transfer Progress"): | |
s3_key = obj['Key'] | |
filename = os.path.basename(s3_key) | |
try: | |
with BytesIO() as buf: | |
s3.download_fileobj(s3_bucket, s3_key, buf) | |
buf.seek(0) | |
ftp.storbinary(f'STOR {filename}', buf) | |
transferred_files += 1 | |
except Exception as e: | |
print(f"Error transferring {filename}: {e}") | |
print(f"Transfer completed. Transferred {transferred_files}/{total_files} files.") | |
else: | |
print(f"No files found in S3 bucket: s3://{s3_bucket}/{s3_prefix}") | |
ftp.quit() | |
except Exception as e: | |
print(f"S3 to FTP transfer failed: {e}") | |
def sftp_to_ftp(self, sftp_host, sftp_user, sftp_password, sftp_path, ftp_host, ftp_user, ftp_password, ftp_path): | |
try: | |
with pysftp.Connection(sftp_host, username=sftp_user, password=sftp_password) as sftp, \ | |
ftplib.FTP(ftp_host) as ftp: | |
ftp.login(ftp_user, ftp_password) | |
ftp.cwd(ftp_path) | |
with sftp.cd(sftp_path): | |
files = sftp.listdir() | |
total_files = len(files) | |
transferred_files = 0 | |
print(f"Transferring files from SFTP: {sftp_host}{sftp_path} to FTP: {ftp_host}{ftp_path}") | |
for file in tqdm(files, desc="Transfer Progress"): | |
try: | |
with sftp.open(file, 'rb') as sftp_file: | |
ftp.storbinary(f'STOR {file}', sftp_file) | |
transferred_files += 1 | |
except Exception as e: | |
print(f"Error transferring {file}: {e}") | |
print(f"Transfer completed. Transferred {transferred_files}/{total_files} files.") | |
except Exception as e: | |
print(f"SFTP to FTP transfer failed: {e}") | |
def sftp_to_sftp(self, source_host, source_user, source_password, source_path, dest_host, dest_user, dest_password, dest_path): | |
try: | |
with pysftp.Connection(source_host, username=source_user, password=source_password) as source_sftp, \ | |
pysftp.Connection(dest_host, username=dest_user, password=dest_password) as dest_sftp: | |
source_sftp.makedirs(dest_path, exist_ok=True) # Ensure destination path exists | |
with source_sftp.cd(source_path): | |
files = source_sftp.listdir() | |
total_files = len(files) | |
transferred_files = 0 | |
print(f"Transferring files from SFTP: {source_host}{source_path} to SFTP: {dest_host}{dest_path}") | |
for file in tqdm(files, desc="Transfer Progress"): | |
try: | |
source_sftp.get(file, os.path.join('/tmp', file)) # Download to local temp | |
dest_sftp.put(os.path.join('/tmp', file), os.path.join(dest_path, file)) | |
os.remove(os.path.join('/tmp', file)) # Clean up temp file | |
transferred_files += 1 | |
except Exception as e: | |
print(f"Error transferring {file}: {e}") | |
print(f"Transfer completed. Transferred {transferred_files}/{total_files} files.") | |
except Exception as e: | |
print(f"SFTP to SFTP transfer failed: {e}") | |
def sftp_to_s3(self, sftp_host, sftp_user, sftp_password, sftp_path, s3_bucket, s3_prefix): | |
try: | |
with pysftp.Connection(sftp_host, username=sftp_user, password=sftp_password) as sftp: | |
with sftp.cd(sftp_path): | |
files = sftp.listdir() | |
total_files = len(files) | |
transferred_files = 0 | |
print(f"Transferring files from SFTP: {sftp_host}{sftp_path} to S3: s3://{s3_bucket}/{s3_prefix}") | |
for file in tqdm(files, desc="Transfer Progress"): | |
try: | |
with sftp.open(file, 'rb') as remote_file: | |
self.get_s3_client().upload_fileobj(remote_file, s3_bucket, f"{s3_prefix}/{file}") | |
transferred_files += 1 | |
except Exception as e: | |
print(f"Error transferring {file}: {e}") | |
print(f"Transfer completed. Transferred {transferred_files}/{total_files} files.") | |
except Exception as e: | |
print(f"SFTP to S3 transfer failed: {e}") | |
def backup_to_ftp(self, backup_format, backup_filename, ftp_host, ftp_user, ftp_password, ftp_path): | |
try: | |
# Create the backup | |
if backup_format == 'tar.gz': | |
with tarfile.open(backup_filename, 'w:gz') as tar: | |
tar.add('.', arcname='.') | |
elif backup_format == 'zip': | |
with zipfile.ZipFile(backup_filename, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
for root, _, files in os.walk('.'): | |
for file in files: | |
zipf.write(os.path.join(root, file)) | |
with ftplib.FTP(ftp_host) as ftp: | |
ftp.login(ftp_user, ftp_password) | |
ftp.cwd(ftp_path) | |
print(f"Uploading {backup_filename} to FTP: {ftp_host}{ftp_path}") | |
with open(backup_filename, 'rb') as file: | |
ftp.storbinary(f'STOR {backup_filename}', file) | |
print(f"Backup {backup_filename} uploaded successfully.") | |
os.remove(backup_filename) # Clean up the backup file | |
except Exception as e: | |
print(f"Backup and FTP upload failed: {e}") | |
def backup_to_sftp(self, backup_format, backup_filename, sftp_host, sftp_user, sftp_password, sftp_path): | |
try: | |
# Create the backup | |
if backup_format == 'tar': | |
with tarfile.open(backup_filename, 'w') as tar: | |
tar.add('.', arcname='.') | |
with pysftp.Connection(sftp_host, username=sftp_user, password=sftp_password) as sftp: | |
sftp.makedirs(sftp_path, exist_ok=True) | |
remote_filepath = os.path.join(sftp_path, backup_filename) | |
print(f"Uploading {backup_filename} to SFTP: {sftp_host}{sftp_path}") | |
sftp.put(backup_filename, remote_filepath) | |
print(f"Backup {backup_filename} uploaded successfully.") | |
os.remove(backup_filename) # Clean up the backup file | |
except Exception as e: | |
print(f"Backup and SFTP upload failed: {e}") | |
def backup_to_s3(self, backup_format, backup_filename, s3_bucket, s3_prefix): | |
try: | |
# Create the backup | |
if backup_format == 'tar.gz': | |
with tarfile.open(backup_filename, 'w:gz') as tar: | |
tar.add('.', arcname='.') | |
print(f"Uploading {backup_filename} to S3: s3://{s3_bucket}/{s3_prefix}") | |
self.get_s3_client().upload_file(backup_filename, s3_bucket, f"{s3_prefix}/{backup_filename}") | |
print(f"Backup {backup_filename} uploaded successfully.") | |
os.remove(backup_filename) # Clean up the backup file | |
except Exception as e: | |
print(f"Backup and S3 upload failed: {e}") | |
def get_s3_client(self): | |
if not self.s3_client: | |
self.s3_client = boto3.client('s3') | |
return self.s3_client | |
def run(self): | |
choice = self.prompt_transfer_method() | |
if not self.confirm_operation(): | |
print("Operation cancelled.") | |
return | |
if choice == 1: | |
source_host = input("Enter source FTP host: ") | |
source_user = input("Enter source FTP username: ") | |
source_password = input("Enter source FTP password: ") | |
source_path = input("Enter source FTP path: ") | |
dest_host = input("Enter destination FTP host: ") | |
dest_user = input("Enter destination FTP username: ") | |
dest_password = input("Enter destination FTP password: ") | |
dest_path = input("Enter destination FTP path: ") | |
self.ftp_to_ftp(source_host, source_user, source_password, source_path, dest_host, dest_user, dest_password, dest_path) | |
elif choice == 2: | |
ftp_host = input("Enter source FTP host: ") | |
ftp_user = input("Enter source FTP username: ") | |
ftp_password = input("Enter source FTP password: ") | |
ftp_path = input("Enter source FTP path: ") | |
s3_bucket = input("Enter destination S3 bucket name: ") | |
s3_prefix = input("Enter destination S3 prefix (optional): ") | |
self.ftp_to_s3(ftp_host, ftp_user, ftp_password, ftp_path, s3_bucket, s3_prefix) | |
elif choice == 3: | |
s3_bucket = input("Enter source S3 bucket name: ") | |
s3_prefix = input("Enter source S3 prefix (optional): ") | |
ftp_host = input("Enter destination FTP host: ") | |
ftp_user = input("Enter destination FTP username: ") | |
ftp_password = input("Enter destination FTP password: ") | |
ftp_path = input("Enter destination FTP path: ") | |
self.s3_to_ftp(s3_bucket, s3_prefix, ftp_host, ftp_user, ftp_password, ftp_path) | |
elif choice == 4: | |
sftp_host = input("Enter source SFTP host: ") | |
sftp_user = input("Enter source SFTP username: ") | |
sftp_password = input("Enter source SFTP password: ") | |
sftp_path = input("Enter source SFTP path: ") | |
ftp_host = input("Enter destination FTP host: ") | |
ftp_user = input("Enter destination FTP username: ") | |
ftp_password = input("Enter destination FTP password: ") | |
ftp_path = input("Enter destination FTP path: ") | |
self.sftp_to_ftp(sftp_host, sftp_user, sftp_password, sftp_path, ftp_host, ftp_user, ftp_password, ftp_path) | |
elif choice == 5: | |
source_host = input("Enter source SFTP host: ") | |
source_user = input("Enter source SFTP username: ") | |
source_password = input("Enter source SFTP password: ") | |
source_path = input("Enter source SFTP path: ") | |
dest_host = input("Enter destination SFTP host: ") | |
dest_user = input("Enter destination SFTP username: ") | |
dest_password = input("Enter destination SFTP password: ") | |
dest_path = input("Enter destination SFTP path: ") | |
self.sftp_to_sftp(source_host, source_user, source_password, source_path, dest_host, dest_user, dest_password, dest_path) | |
elif choice == 6: | |
sftp_host = input("Enter source SFTP host: ") | |
sftp_user = input("Enter source SFTP username: ") | |
sftp_password = input("Enter source SFTP password: ") | |
sftp_path = input("Enter source SFTP path: ") | |
s3_bucket = input("Enter destination S3 bucket name: ") | |
s3_prefix = input("Enter destination S3 prefix (optional): ") | |
self.sftp_to_s3(sftp_host, sftp_user, sftp_password, sftp_path, s3_bucket, s3_prefix) | |
elif choice == 7: | |
ftp_host = input("Enter destination FTP host: ") | |
ftp_user = input("Enter destination FTP username: ") | |
ftp_password = input("Enter destination FTP password: ") | |
ftp_path = input("Enter destination FTP path: ") | |
self.backup_to_ftp('tar.gz', 'backup.tar.gz', ftp_host, ftp_user, ftp_password, ftp_path) | |
elif choice == 8: | |
ftp_host = input("Enter destination FTP host: ") | |
ftp_user = input("Enter destination FTP username: ") | |
ftp_password = input("Enter destination FTP password: ") | |
ftp_path = input("Enter destination FTP path: ") | |
self.backup_to_ftp('zip', 'backup.zip', ftp_host, ftp_user, ftp_password, ftp_path) | |
elif choice == 9: | |
sftp_host = input("Enter destination SFTP host: ") | |
sftp_user = input("Enter destination SFTP username: ") | |
sftp_password = input("Enter destination SFTP password: ") | |
sftp_path = input("Enter destination SFTP path: ") | |
self.backup_to_sftp('tar', 'backup.tar', sftp_host, sftp_user, sftp_password, sftp_path) | |
elif choice == 10: | |
s3_bucket = input("Enter destination S3 bucket name: ") | |
s3_prefix = input("Enter destination S3 prefix (optional): ") | |
self.backup_to_s3('tar.gz', 'backup.tar.gz', s3_bucket, s3_prefix) | |
if __name__ == "__main__": | |
transfer_script = FileTransferScript() | |
transfer_script.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment