Last active
December 31, 2024 23:20
-
-
Save Pymmdrza/3e3f30b16f4009503e7572b914fd200d to your computer and use it in GitHub Desktop.
R2 Cloudflare Upload file with boto in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://gist.github.com/Pymmdrza/3e3f30b16f4009503e7572b914fd200d | |
# install package's from requirements.txt | |
# Windows : pip install -r requirements.txt | |
# Linux : pip3 install -r requirements.txt | |
import os | |
import boto3 | |
from tqdm import tqdm | |
from colorthon import Colors as Fore | |
def get_user_input(prompt, example=''): | |
user_input = input(f"{Fore.GREEN}{prompt}{Fore.RESET} [{Fore.GREY}e.x: {example}{Fore.RESET}]: ").strip() | |
return user_input | |
def validate_input(input_value, expected_length, message): | |
if not input_value or len(input_value) != expected_length: | |
print(f"\n{Fore.RED}{message} Exiting...{Fore.RESET}") | |
exit(1) | |
def upload_to_r2_cloudflare(endpoint_url, access_key_id, secret_key, region_name, remote_folder, local_file): | |
s3 = boto3.client('s3', endpoint_url=endpoint_url, aws_access_key_id=access_key_id, | |
aws_secret_access_key=secret_key, region_name=region_name) | |
file_name = os.path.basename(local_file) | |
file_size = os.path.getsize(local_file) | |
print(f"File Name: {Fore.GREY}{file_name}{Fore.RESET}") | |
print(f"File Size: {Fore.MAGENTA}{round(file_size / 1024 / 1024, 2)} MB{Fore.RESET}") | |
with tqdm(total=file_size, unit='B', unit_scale=True, desc=f'Uploading {file_name} to R2', ascii=True) as pbar: | |
with open(local_file, 'rb') as f: | |
s3.upload_fileobj(f, remote_folder, file_name, Callback=lambda x: pbar.update(x)) | |
def main(): | |
print(f"\n{Fore.GREEN}Enter Account ID R2 Cloudflare:") | |
account_id = get_user_input('ACCOUNT ID', '5bdab1c198a...1c9b134') | |
print(f"\n{Fore.GREEN}Enter Access Key R2 Cloudflare (API/AWS Access Key):") | |
access_key = get_user_input('ACCESS KEY ID', '2bddb1c1...b17d1c934') | |
print(f"\n{Fore.GREEN}Enter Secret Key R2 Cloudflare (API/AWS Secret Key):") | |
secret_key = get_user_input('SECRET KEY', 'af81a81c...4696f') | |
print(f"\n{Fore.GREEN}Enter Region R2 Cloudflare (Default: auto):") | |
reg_name = get_user_input('REGION NAME (Default: auto)', 'auto') | |
validate_input(account_id, 32, 'Missing or invalid Account ID') | |
validate_input(access_key, 32, 'Missing or invalid Access Key ID') | |
validate_input(secret_key, 64, 'Missing or invalid Secret Key') | |
endpoint_url = f'https://{account_id}.r2.cloudflarestorage.com' | |
print(f"{Fore.GREEN}Remote Folder Path on R2 bucket :{Fore.RESET}") | |
remote_folder = get_user_input('Remote Folder Path on R2 bucket') | |
print(f"\n\n{Fore.CYAN}Upload File To R2 Cloudflare Storage Bucket{Fore.RESET}\n\n") | |
file_path = get_user_input('File Name or Path Full') | |
print(f"{Fore.GREEN}Folder Path:{Fore.RESET} {Fore.YELLOW}{remote_folder}{Fore.RESET}") | |
upload_to_r2_cloudflare(endpoint_url, access_key, secret_key, reg_name, remote_folder, file_path) | |
print(f"\n\n{Fore.GREEN}Upload Completed{Fore.RESET}\n\n") | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://gist.github.com/Pymmdrza/3e3f30b16f4009503e7572b914fd200d | |
# install package's from requirements.txt | |
# Windows : pip install -r requirements.txt | |
# Linux : pip3 install -r requirements.txt | |
import boto3, os | |
from tqdm import tqdm | |
from colorthon import Colors as Fore | |
print(f"\n{Fore.GREEN}Enter Account ID R2 Cloudflare:{Fore.RESET}") | |
print(f"{Fore.GREY}[Example] Account ID (32 Character): 5bdab1c198a...1c9b134{Fore.RESET}") | |
print(f"{Fore.GREY}[Example] Endpoint URL : https://5bdab1c198a...1c9b134.r2.cloudflarestorage.com{Fore.RESET}") | |
account_id = str(input('ACCOUNT ID >> ')) | |
print(f"\n{Fore.GREEN}Enter Access Key R2 Cloudflare (API/AWS Access Key):{Fore.RESET}") | |
print(f"{Fore.GREY}[Example] Access Key ID (32 Character) : 2bddb1c1...b17d1c934{Fore.RESET}") | |
access_key = str(input('ACCESS KEY ID >> ')) | |
print(f"\n{Fore.GREEN}Enter Secret Key R2 Cloudflare (API/AWS Secret Key):{Fore.RESET}") | |
print(f"{Fore.GREY}[Example] Secret Key (64 Character) : af81a81c...4696f{Fore.RESET}") | |
secret_key = str(input('SECRET KEY >> ')) | |
print(f"\n{Fore.GREEN}Enter Region R2 Cloudflare (Default: auto):{Fore.RESET}") | |
reg_name = str(input('REGION NAME (Default: auto) >> ')) | |
# // data input check // | |
if reg_name == '': | |
reg_name = 'auto' | |
if account_id == '' or len(account_id) != 32: | |
print(f"\n{Fore.RED}Missing Account ID Exiting...{Fore.RESET}") | |
exit() | |
if access_key == '' or len(access_key) != 32: | |
print(f"\n{Fore.RED}Missing Access Key Exiting...{Fore.RESET}") | |
exit() | |
if secret_key == '' or len(secret_key) != 64: | |
print(f"\n{Fore.RED}Missing Secret Key Exiting...{Fore.RESET}") | |
exit() | |
# // r2 cloudflare data // | |
r2data = { | |
'endpoint_url': f'https://{account_id}.r2.cloudflarestorage.com', | |
'aws_access_key_id': f'{access_key}', | |
'aws_secret_access_key': f'{secret_key}', | |
'region_name': f'{reg_name}', | |
} | |
# upload file to r2 cloudflare storage | |
def R2Upload(remoteFolder, localFile): | |
print(f"Create {Fore.CYAN}S3{Fore.RESET} Client ({Fore.GREEN}R2{Fore.RESET})") | |
s3 = boto3.client('s3', endpoint_url=r2data['endpoint_url'], aws_access_key_id=r2data['aws_access_key_id'], | |
aws_secret_access_key=r2data['aws_secret_access_key'], region_name=r2data['region_name']) | |
file_name = os.path.basename(localFile) | |
file_size = os.path.getsize(localFile) | |
print(f"File Name : {Fore.GREY}{file_name}{Fore.RESET}") | |
print(f"File Size : {Fore.MAGENTA}{round(file_size/1024/1024,2)}{Fore.RESET} MB") | |
with tqdm(total=file_size, unit='B', unit_scale=True, desc=f'Uploading {file_name} to R2', ascii=True) as pbar: | |
with open(localFile, 'rb') as f: | |
s3.upload_fileobj(f, remoteFolder, file_name, Callback=lambda x: pbar.update(x)) | |
print(f"\n\n\n{Fore.CYAN}Upload File To R2 Cloudflare Storage Bucket{Fore.RESET}\n\n\n") | |
print(f"{Fore.GREEN}File Name or Path Full :{Fore.RESET}") | |
fileup = str(input('> ')) | |
print(f"{Fore.GREEN}Remote Folder Path on R2 bucket :{Fore.RESET}") | |
folder_path = str(input('> ')) | |
print(f"{Fore.GREEN}Folder Path :{Fore.RESET} {Fore.YELLOW}{folder_path}{Fore.RESET}") | |
R2Upload(folder_path, fileup) | |
print(f"\n\n\n{Fore.GREEN}Upload Completed{Fore.RESET}\n\n\n") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
boto3 | |
tqdm | |
colorthon |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
R2 Cloudflare Upload Script with
boto3
in pythonUse & Download
R2 Cloudflare File's : Download
Install Package's with pip or pip3
After extracting, install the required libraries with the following command from the
requirements.txt
filewith pip:
with pip3:
direct install
pip
:direct install
pip3
: