Skip to content

Instantly share code, notes, and snippets.

@arky
Created April 16, 2026 09:54
Show Gist options
  • Select an option

  • Save arky/e940f3a14734ca9b21e8ff52421e994d to your computer and use it in GitHub Desktop.

Select an option

Save arky/e940f3a14734ca9b21e8ff52421e994d to your computer and use it in GitHub Desktop.
BulkFetch allows you batch download files
import io
import csv
import re
from datetime import date, datetime
from urllib.parse import unquote, urlparse
import requests
import zipfile
from shiny import reactive
from shiny.express import render, ui, input
from shinyswatch import theme
DOWNLOAD_LIMIT = 100
def _filename_from_response(response: requests.Response, url: str) -> str:
"""Derive a filename from Content-Disposition, falling back to the URL path."""
cd = response.headers.get("Content-Disposition", "")
if cd:
# RFC 5987 encoded: filename*=UTF-8''name%20here.pdf (takes priority)
m = re.search(r"filename\*=(?:[^']*'')?([^\s;]+)", cd, re.IGNORECASE)
if m:
return unquote(m.group(1).strip(" \"'"))
# Plain: filename="name.pdf" or filename=name.pdf
m = re.search(r'filename=["\']?([^"\'\s;]+)["\']?', cd, re.IGNORECASE)
if m:
return unquote(m.group(1).strip(" \"'"))
return urlparse(url).path.split("/")[-1] or "unnamed_file"
log_entries = reactive.value([]) # list of {url, filename, status, message}
def _read_uploaded(file_info) -> str | None:
if not file_info:
return None
with open(file_info[0]["datapath"], "r", encoding="utf-8", errors="replace") as f:
return f.read()
def _get_csv_fields(text: str) -> list[str]:
try:
reader = csv.DictReader(io.StringIO(text.strip()))
return list(reader.fieldnames or [])
except Exception:
return []
def _extract_urls_from_csv(text: str, col: str) -> list[str]:
reader = csv.DictReader(io.StringIO(text.strip()))
return [row[col].strip() for row in reader if row.get(col, "").strip()]
@reactive.calc
def _active_text():
uploaded = _read_uploaded(input.csv_file())
return (uploaded, True) if uploaded else (input.urls(), False)
@reactive.calc
def _current_urls():
text, is_csv = _active_text()
if not text or not text.strip():
return []
if is_csv:
try:
col = input.url_col()
except Exception:
col = None
if not col:
return []
return _extract_urls_from_csv(text, col)
return [u for u in text.split() if u]
# ---------------------------------------------------------------------------
# UI
# ---------------------------------------------------------------------------
ui.page_opts(title="BulkFetch", theme=theme.darkly)
with ui.div(class_="d-flex justify-content-center pt-5"):
with ui.div(style="width:70%"):
with ui.card():
#ui.card_header("BulkFetch")
with ui.card_body():
ui.input_file("csv_file", "Upload CSV file", accept=[".csv", ".txt"], multiple=False)
@render.ui
def col_selector_ui():
file_info = input.csv_file()
if not file_info:
return ui.HTML("")
uploaded = _read_uploaded(file_info)
fields = _get_csv_fields(uploaded) if uploaded else []
if not fields:
return ui.p("No columns found in uploaded file.", class_="text-danger small")
default = "Attachments" if "Attachments" in fields else fields[0]
if "Attachments" not in fields:
ui.p(
'"Attachments" column not found — please select the URL column below.',
class_="text-warning small",
)
return ui.input_select(
"url_col", "URL column",
choices={f: f for f in fields},
selected=default,
)
ui.p("— or paste URLs below (one per line) —", class_="text-muted small")
ui.input_text_area(
"urls", "", rows=6, width="100%",
placeholder="https://example.com/file1.zip\nhttps://example.com/file2.pdf",
)
@render.ui
def url_preview():
urls = _current_urls()
if not urls:
return ui.p("No URLs loaded yet.", class_="text-muted small")
rows = "".join(
f"<tr>"
f"<td class='text-muted'>{i + 1}</td>"
f"<td class='small text-break'>{url}</td>"
f"</tr>"
for i, url in enumerate(urls)
)
return ui.HTML(
f'<p class="small text-muted mb-1">{len(urls)} URL(s) ready to download</p>'
f'<div class="overflow-auto" style="max-height:220px">'
f'<table class="table table-sm table-striped mb-0">'
f'<thead><tr><th>#</th><th>URL</th></tr></thead>'
f'<tbody>{rows}</tbody>'
f'</table></div>'
)
@render.download(
filename=lambda: (
f"{date.today().isoformat()}T{datetime.now().strftime('%H%M%S')}"
f"_downloaded_files.zip"
)
)
def download():
urls = _current_urls()
if not urls:
ui.modal_show(ui.modal(
"Please upload a CSV or enter at least one URL.",
title="Error: No URLs provided.",
easy_close=True, footer=None,
))
return
if len(urls) > DOWNLOAD_LIMIT:
ui.modal_show(ui.modal(
f"Please limit your input to a maximum of {DOWNLOAD_LIMIT} URLs.",
title="Error: Too many URLs.",
easy_close=True, footer=None,
))
return
total = len(urls)
entries = []
zip_buffer = io.BytesIO()
with ui.Progress(min=0, max=total + 1) as p:
p.set(0, message="Parsing URLs…", detail=f"{total} link(s) found")
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
for i, url in enumerate(urls):
url_filename = urlparse(url).path.split("/")[-1] or "unnamed_file"
p.set(i, message=f"Downloading {i + 1} / {total}", detail=url_filename)
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
filename = _filename_from_response(response, url)
zip_file.writestr(filename, response.content)
entries.append({
"url": url, "filename": filename,
"status": "ok", "message": f"{len(response.content):,} bytes",
})
except requests.exceptions.RequestException as e:
zip_file.writestr(f"{url_filename}_error.txt", f"Failed: {e}")
entries.append({
"url": url, "filename": url_filename,
"status": "error", "message": str(e),
})
p.set(total, message="Packing ZIP…", detail="Almost done")
log_entries.set(entries)
zip_buffer.seek(0)
yield zip_buffer.getvalue()
with ui.card_footer():
@render.ui
def download_log():
entries = log_entries.get()
if not entries:
return ui.HTML("")
rows = []
for i, e in enumerate(entries):
badge = "bg-success" if e["status"] == "ok" else "bg-danger"
label = "OK" if e["status"] == "ok" else "Error"
rows.append(
f"<tr>"
f"<td class='text-muted'>{i + 1}</td>"
f"<td class='small text-truncate' style='max-width:180px' title='{e['filename']}'>{e['filename']}</td>"
f"<td><span class='badge {badge}'>{label}</span></td>"
f"<td class='small text-muted'>{e['message']}</td>"
f"</tr>"
)
ok = sum(1 for e in entries if e["status"] == "ok")
fail = len(entries) - ok
return ui.HTML(
f'<p class="small text-muted mb-1">{ok} downloaded, {fail} failed</p>'
f'<div class="overflow-auto">'
f'<table class="table table-sm">'
f'<thead><tr><th>#</th><th>File</th><th>Status</th><th>Details</th></tr></thead>'
f'<tbody>{"".join(rows)}</tbody>'
f'</table></div>'
)
# ---------------------------------------------------------------------------
# Reactive effects
# ---------------------------------------------------------------------------
@reactive.effect
def _update_col_select():
uploaded = _read_uploaded(input.csv_file())
if not uploaded:
return
fields = _get_csv_fields(uploaded)
if not fields:
return
default = "Attachments" if "Attachments" in fields else fields[0]
ui.update_select("url_col", choices={f: f for f in fields}, selected=default)
shiny>=1.6.0
requests>=2.33.1
shinyswatch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment