Created
October 3, 2023 20:49
-
-
Save guissalustiano/496c9172756a098335ff9e5a228be4d0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Based in https://github.com/cuducos/chunk | |
from asyncio import Semaphore, gather, run | |
import httpx | |
from loguru import logger | |
url = "https://dadosabertos.rfb.gov.br/CNPJ/Empresas1.zip" | |
default_chunk_size = 2**20 # 1MB | |
# DefaultMaxRetries sets the maximum download attempt for each chunk | |
default_max_retries = 32 | |
# DefaultMaxParallel sets the maximum parallels downloads per server | |
default_max_parallel = 16 | |
# DefaultTimeout sets the timeout for each HTTP request | |
default_timeout = 3 * 60 * 1000 # 3 minutes | |
def chunk_range(content_length: int, chunk_size: int) -> list[tuple[int, int]]: | |
"""Split the content length into a list of chunk ranges""" | |
return [(i * chunk_size, min((i + 1) * chunk_size - 1, content_length - 1)) for i in range(content_length // chunk_size + 1)] | |
# from https://stackoverflow.com/a/64283770 | |
async def download( | |
url: str, | |
chunk_size: int = default_chunk_size, | |
max_retries: int = default_max_retries, | |
max_parallel: int = default_max_parallel, | |
timeout: int = default_timeout | |
) -> bytes: | |
request_head = httpx.head(url) | |
assert request_head.status_code == 200 | |
assert request_head.headers["accept-ranges"] == "bytes" | |
content_length = int(request_head.headers["content-length"]) | |
logger.info(f"Downloading {url} with {content_length} bytes / {chunk_size} chunks and {max_parallel} parallel downloads") | |
# TODO: pool http connections | |
semaphore = Semaphore(max_parallel) | |
tasks = [download_chunk(url, (start, end), max_retries, timeout, semaphore) for start, end in chunk_range(content_length, chunk_size)] | |
return b"".join(await gather(*tasks)) | |
async def download_chunk( | |
url: str, | |
chunk_range: tuple[int, int], | |
max_retries: int, | |
timeout: int, | |
semaphore: Semaphore | |
) -> bytes: | |
async with semaphore: | |
logger.info(f"Downloading chunk {chunk_range[0]}-{chunk_range[1]}") | |
for i in range(max_retries): | |
try: | |
async with httpx.AsyncClient(timeout=timeout) as client: | |
headers = {"Range": f"bytes={chunk_range[0]}-{chunk_range[1]}"} | |
response = await client.get(url, headers=headers) | |
response.raise_for_status() | |
return response.content | |
except httpx.HTTPError as e: | |
logger.warning(f"Download failed with {e}. Retrying ({i+1}/{max_retries})...") | |
raise httpx.HTTPError(f"Download failed after {max_retries} retries") | |
async def main(): | |
content = await download(url) | |
with open("Empresas1.zip", "wb") as f: | |
f.write(content) | |
if __name__ == "__main__": | |
run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment