Created
April 7, 2023 17:21
-
-
Save dan0nchik/8959c9000d1ff04e41ac5845dd3281c0 to your computer and use it in GitHub Desktop.
[EXPERIMENTAL] Download files in parallel from S3 Storage using boto3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from asyncio import as_completed | |
from functools import partial | |
import os | |
import boto3 | |
import tqdm | |
from concurrent.futures.thread import ThreadPoolExecutor | |
from dotenv import load_dotenv | |
load_dotenv() | |
LOCAL_WEIGHTS_DIR = 'model-weights-skills' | |
S3_FOLDER = 'skills/' | |
BUCKET_NAME = 'model-weights' | |
MODEL_WEIGHTS_S3_ACCESS_KEY = os.getenv('MODEL_WEIGHTS_S3_ACCESS_KEY') | |
MODEL_WEIGHTS_S3_SECRET_KEY = os.getenv('MODEL_WEIGHTS_S3_SECRET_KEY') | |
MODEL_WEIGHTS_S3_URL = os.getenv('MODEL_WEIGHTS_S3_URL') | |
def download_one_file(bucket: str, output: str, s3_file: str): | |
session = boto3.Session() | |
client = session.client("s3", | |
endpoint_url='https://s3.timeweb.com', | |
region_name='ru-1', | |
aws_access_key_id=MODEL_WEIGHTS_S3_ACCESS_KEY, | |
aws_secret_access_key=MODEL_WEIGHTS_S3_SECRET_KEY) | |
path, filename = s3_file.split('/') | |
client.download_file(bucket, s3_file, os.path.join(output, filename)) | |
return 'Downloaded', s3_file | |
def main(): | |
# listing file to download with folder filter | |
s3 = boto3.resource( | |
's3', | |
endpoint_url='https://s3.timeweb.com', | |
region_name='ru-1', | |
aws_access_key_id=MODEL_WEIGHTS_S3_ACCESS_KEY, | |
aws_secret_access_key=MODEL_WEIGHTS_S3_SECRET_KEY) | |
files_to_download = [] | |
bucket = s3.Bucket(BUCKET_NAME) | |
for obj in bucket.objects.filter(Prefix=S3_FOLDER): | |
files_to_download.append(obj.key) | |
# List for storing possible failed downloads to retry later | |
failed_downloads = [] | |
if not os.path.exists(LOCAL_WEIGHTS_DIR): | |
os.mkdir(LOCAL_WEIGHTS_DIR) | |
with ThreadPoolExecutor(max_workers=4) as executor: | |
# Using a dict for preserving the downloaded file for each future, to store it as a failure if we need that | |
futures = [] | |
for file_to_download in files_to_download: | |
futures.append( | |
executor.submit(download_one_file, bucket=BUCKET_NAME, output=LOCAL_WEIGHTS_DIR, | |
s3_file=file_to_download)) | |
for future in futures: | |
print('Done', future.result()) | |
if len(failed_downloads) > 0: | |
print("Some downloads have failed:") | |
print(failed_downloads) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment